package com.example.redisconsumer.controller;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.data.redis.core.StringRedisTemplate;


import javax.annotation.Resource;
import java.util.List;

@RestController
public class ConsumerController {
    @Resource
    StringRedisTemplate stringRedisTemplate;
    @GetMapping("/consumer")

    public void consumer()throws Exception{

        //实例化消费
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("consumer");
        //设置消费地址
        consumer.setNamesrvAddr("localhost:9876");
        //订阅一个或多个Topic，以及Tag过滤需要消费的消息
        consumer.subscribe("TopicTest","*");
        //注册回调实现类，来处理broker拉取回来的信息
        //消费模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //标记该消息已成功消费
                System.out.printf("%s Receive new Message2222222 %n",Thread.currentThread().getName(),msgs);
                MessageExt msg=msgs.get(0);
                String body=new String(msg.getBody());
//消费的用户 存入到userList 中
                stringRedisTemplate.opsForList().leftPush("userlist",body);
                //ConsumeConcurrentlyStatus.CONSUME_SUCCESS
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

            }
        });
        consumer.start();
        System.out.println("Consumer Start.%n");

    }
    //查询所有用户
    @GetMapping("/getUserList")
    public List getUserList(){
        List<String> list=stringRedisTemplate.opsForList().range("userList",0,-1) ;
        return list;
    }
    @GetMapping("/method4")

    public void method4()throws Exception{

        //实例化消费
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("consumer2");
        //设置消费地址
        consumer.setNamesrvAddr("localhost:9876");
        //订阅一个或多个Topic，以及Tag过滤需要消费的消息
        consumer.subscribe("TopicTest","*");
        //注册回调实现类，来处理broker拉取回来的信息
        //消费模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //标记该消息已成功消费
                System.out.printf("%s Receive new Message33333333333 %n",Thread.currentThread().getName(),msgs);
//                MessageExt msg=msgs.get(0);
//                String body=new String(msg.getBody());
////消费的用户 存入到userList 中
//                stringRedisTemplate.opsForList().leftPush("userlist",body);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

            }
        });
        consumer.start();
        System.out.println("Consumer Start.%n");

    }
}
